In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, row_number,avg, desc
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import Window
from pyspark.ml.feature import BucketedRandomProjectionLSH, Normalizer
import plotly.express as px
import seaborn as sns
In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Data Preprocessing") \
    .getOrCreate()
24/05/09 22:53:43 WARN Utils: Your hostname, Soumitras-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.227 instead (on interface en0)
24/05/09 22:53:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/09 22:53:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [3]:
# Load the datasets from hadoop
file_path1 = "hdfs://localhost:9000/datafolder/datazip/Books_rating.csv"
file_path2 = "hdfs://localhost:9000/datafolder/datazip/books_data.csv"

books_rating = spark.read.csv(file_path1, header=True, inferSchema=True)
books_data = spark.read.csv(file_path2, header=True, inferSchema=True)
In [5]:
# Considering only the required columns
books_rating = books_rating[['Id','Title','User_id','review/score','Price']] 
books_data = books_data[['Title','authors','publisher','categories','publishedDate']]

books_rating.describe().show()
books_data.describe().show()
24/05/09 22:53:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                
+-------+--------------------+--------------------+-------------------+------------------+--------------------+
|summary|                  Id|               Title|            User_id|      review/score|               Price|
+-------+--------------------+--------------------+-------------------+------------------+--------------------+
|  count|             3000000|             2999792|            2437750|           2999870|              482421|
|   mean|1.0568515696607149E9|   2012.796651763537|  18.29299003322259| 1656.860421970827|  21.767951161877054|
| stddev| 1.284488524833734E9|  1536.7533549608797|  21.99284402625621|1427549.9863179324|   26.21155241772817|
|    min|          0001047604|  """ Film technique| "" Film acting """|   & Algorithms"""|              "" and|
|    max|          B0064P287I|you can do anythi...|      AZZZZW74AAX75|         thersites|: A guide to loca...|
+-------+--------------------+--------------------+-------------------+------------------+--------------------+

[Stage 7:>                                                          (0 + 8) / 8]
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|               Title|             authors|           publisher|          categories|       publishedDate|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              212403|              181153|              139274|              171880|              186560|
|   mean|   3823.672941176471|              1578.4|             3734.75|  1983.7334777898159|   1982.702933143332|
| stddev|  10717.999589636447|  1278.7901502106834|  10193.316327911616|  142.43423125699238|   37.65620052385513|
|    min|  """ Film technique| "" ""I'm a Littl...| "" ""Skipper Ire...| "" Knox's quirky...| "" ""Cruising fo...|
|    max|you can do anythi...|” “Jeanie with th...|                펜립|�� folk art is a ...|” which is anthol...|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+

                                                                                
In [6]:
# Extract only the year from the publishedDate column
from pyspark.sql.functions import year
books_data = books_data.withColumn("publishedYear", year("publishedDate")).drop("publishedDate")
books_data = books_data.filter(col("publishedYear").rlike("^\d+$"))
In [7]:
# dropping duplicate rows in both the tables

count_original = books_rating.count() 
books_rating.dropDuplicates()
count_after = books_rating.count() 
d1 = count_original - count_after
print("number of duplicates in books_rating:", d1)
count_original2 = books_data.count() 
books_data.dropDuplicates()
count_after2 = books_data.count() 
d2 = count_original2 - count_after2
print("number of duplicates in books_data:", d2)
                                                                                
number of duplicates in books_rating: 0
number of duplicates in books_data: 0
In [8]:
# Create a dictionary to display the count of null values for each column
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)
Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
[Stage 31:====================>                                    (8 + 8) / 22]
CodeCache: size=131072Kb used=34992Kb max_used=35006Kb free=96079Kb
 bounds [0x0000000104934000, 0x0000000106ba4000, 0x000000010c934000]
 total_blobs=13261 nmethods=12319 adapters=854
 compilation: disabled (not enough contiguous free space left)
                                                                                
Null values in books_rating dataset: {'Id': 0, 'Title': 208, 'User_id': 562250, 'review/score': 130, 'Price': 2517579}
Null values in books_data dataset: {'Title': 1, 'authors': 6839, 'publisher': 48179, 'categories': 15259, 'publishedYear': 0}
In [9]:
from pyspark.sql.functions import when, col, first
from pyspark.sql.window import Window
books_rating = books_rating.withColumn("Price", when(col("Price").cast("double").isNotNull(), col("Price")).otherwise(None))
books_rating.show()

# filling the null values in price column with the price of same books ( some users left the price columns blank)
window_spec = Window.partitionBy("Title").orderBy("Price")
books_rating = books_rating.withColumn("Price", first("Price", True).over(window_spec))
+----------+--------------------+--------------+------------+-----+
|        Id|               Title|       User_id|review/score|Price|
+----------+--------------------+--------------+------------+-----+
|1882931173|Its Only Art If I...| AVCGYZL8FQQTD|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A30TK6U7DNS82R|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A3UH4UZ4RSVO82|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2MVUWT453QH61|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A22X4XUPKF66MR|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2F6NONFUDB6UK|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A14OJS0VWMOSWO|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2RSSXTDZDUSH4|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A25MD5I2GUIW6W|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A3VA4XFS5WNJO3|         4.0| NULL|
|0829814000|Wonderful Worship...| AZ0IOBU20TBOP|         5.0|19.40|
|0829814000|Wonderful Worship...|A373VVEU6Z9M0N|         5.0|19.40|
|0829814000|Wonderful Worship...| AGKGOH65VTRR4|         5.0|19.40|
|0829814000|Wonderful Worship...| A3OQWLU31BU1Y|         5.0|19.40|
|0595344550|Whispers of the W...|A3Q12RK71N74LB|         1.0|10.95|
|0595344550|Whispers of the W...|A1E9M6APK30ZAU|         4.0|10.95|
|0595344550|Whispers of the W...| AUR0VA5H0C66C|         1.0|10.95|
|0595344550|Whispers of the W...|A1YLDZ3VHR6QPZ|         5.0|10.95|
|0595344550|Whispers of the W...| ACO23CG8K8T77|         5.0|10.95|
|0595344550|Whispers of the W...|A1VK81CRRC7MLM|         5.0|10.95|
+----------+--------------------+--------------+------------+-----+
only showing top 20 rows

In [10]:
# Dropping null values

books_rating = books_rating.na.drop()
books_data = books_data.na.drop()
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)

books_rating.describe().show()
books_data.describe().show()
                                                                                
Null values in books_rating dataset: {'Id': 0, 'Title': 0, 'User_id': 0, 'review/score': 0, 'Price': 0}
Null values in books_data dataset: {'Title': 0, 'authors': 0, 'publisher': 0, 'categories': 0, 'publishedYear': 0}
                                                                                
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|summary|                  Id|               Title|             User_id|        review/score|            Price|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|  count|              414180|              414180|              414180|              414180|           414180|
|   mean|1.0516446472336355E9|   1889.688679245283|                NULL|   4.239725294240715|21.62184188033609|
| stddev|1.0441895933760347E9|   124.2255524061775|                NULL|  1.2911124349740133|26.24759579737497|
|    min|          0002554232|"""Beauty Shop-Ph...|A00117421L76WVWG4...|     & Algorithms"""|             1.00|
|    max|          B000TGB9VE|www.whitbread.org...|       AZZZZW74AAX75|teach to understa...|           995.00|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+

+-------+--------------------+------------------------+--------------------+--------------------+-----------------+
|summary|               Title|                 authors|           publisher|          categories|    publishedYear|
+-------+--------------------+------------------------+--------------------+--------------------+-----------------+
|  count|              112450|                  112450|              112450|              112450|           112450|
|   mean|             1320.25|                    NULL|             51495.0|                NULL|2002.091943085816|
| stddev|    841.971623206931|                    NULL|                NULL|                NULL|14.00102150533584|
|    min|""" We'll Always ...|     The New York Tim...| as shared with D...| Luther Vandross....|             1016|
|    max|you can do anythi...|['黒田領治', 'Ryōji K...|                펜립|          ['Zoning']|             2023|
+-------+--------------------+------------------------+--------------------+--------------------+-----------------+

In [11]:
# Removing special characters

from pyspark.sql.functions import udf, col, split, size, regexp_replace, initcap
from pyspark.sql.types import StringType
import re
from pyspark.sql.functions import year
def remove_special_characters(text):
    if text is not None:
        return re.sub(r'[^\w\s]', '', text)
    else:
        return None
remove_special_characters_udf = udf(remove_special_characters, StringType())
books_data = books_data.withColumn("categories", remove_special_characters_udf("categories"))
books_data = books_data.filter(size(split(col("categories"), " ")) == 1)
books_data = books_data.withColumn("Title", regexp_replace(col("Title"), "\\b\\s+", ""))
books_data = books_data.withColumn("Title", initcap(col("Title")))
books_rating = books_rating.withColumn("Title", initcap(col("Title")))
books_data = books_data.withColumn("categories", initcap(col("categories")))
def remove_integers_and_special_characters(text):
    if text is not None:
        return re.sub(r'[^a-zA-Z\s]', '', text)
    else:
        return None
remove_integers_and_special_characters_udf = udf(remove_integers_and_special_characters, StringType())
books_data = books_data.withColumn("categories", remove_integers_and_special_characters_udf("categories"))
books_data = books_data.filter(books_data["categories"].rlike("^[a-zA-Z\s]+$"))
In [12]:
# Renaming columns for convenience
books_rating = books_rating.withColumnRenamed("review/score", "rating")
books_rating.describe().show()
[Stage 109:======>                                                  (1 + 8) / 9]
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|summary|                  Id|               Title|             User_id|              rating|            Price|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|  count|              414180|              414180|              414180|              414180|           414180|
|   mean|1.0516446472336355E9|   1889.688679245283|                NULL|   4.239725294240715|21.62184188033609|
| stddev|1.0441895933760347E9|   124.2255524061775|                NULL|  1.2911124349740133|26.24759579737497|
|    min|          0002554232|"""beauty Shop-ph...|A00117421L76WVWG4...|     & Algorithms"""|             1.00|
|    max|          B000TGB9VE|Zulu Shaman: Drea...|       AZZZZW74AAX75|teach to understa...|           995.00|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+

                                                                                
In [13]:
# Count the number of ratings per user
user_id_counts = books_rating.groupBy("User_id").agg(count("*").alias("count"))
user_id_counts= user_id_counts.orderBy(user_id_counts["count"].desc())
user_id_counts.show(20)
result = user_id_counts.groupBy((col('count') / 10).cast('int').alias('range')).count().orderBy('range')
result.show()
                                                                                
+--------------+-----+
|       User_id|count|
+--------------+-----+
|A14OJS0VWMOSWO| 2105|
|   AFVQZQ8PW0L|  606|
| AG35NEEFCMQVR|  307|
|A1M8PP7MLHNBQB|  278|
|A1D2C0WDCSHUWZ|  271|
| AHD101501WCN1|  242|
|A2VKWLCNZF4ZVB|  205|
|A1NATT3PN24QWY|  200|
|A1K1JW1C5CUSUZ|  179|
|A1X8VZWTOG8IS6|  174|
|A1S3C5OFU508P3|  158|
|A3M174IC0VXOS2|  152|
|A2EDZH51XHFA9B|  147|
|A2VE83MZF98ITY|  143|
|A21NVBFIEQWDSG|  142|
|A2NJO6YE954DBH|  141|
|A2OJW07GQRNJUT|  129|
|A2F6N60Z96CAJI|  118|
|A1OX82JPAQLL60|  113|
|A1G37DFO8MQW0M|  112|
+--------------+-----+
only showing top 20 rows

[Stage 121:============>                                            (2 + 7) / 9]
+-----+------+
|range| count|
+-----+------+
|    0|303448|
|    1|  1213|
|    2|   250|
|    3|   126|
|    4|    47|
|    5|    33|
|    6|    16|
|    7|    10|
|    8|     6|
|    9|     7|
|   10|     8|
|   11|     5|
|   12|     1|
|   14|     4|
|   15|     2|
|   17|     2|
|   20|     2|
|   24|     1|
|   27|     2|
|   30|     1|
+-----+------+
only showing top 20 rows

                                                                                
In [14]:
# Filter out users that has invalid user_id
user_id_counts = user_id_counts.filter(col("user_id").rlike("^[a-zA-Z0-9]+$"))
user_id_counts.show(20)

# Filter users who has given the ratings
filtered_user_ids = user_id_counts.filter(col("count") >= 1).select("User_id")
filtered_user_ids.describe().show()
                                                                                
+--------------+-----+
|       User_id|count|
+--------------+-----+
|A14OJS0VWMOSWO| 2105|
|   AFVQZQ8PW0L|  606|
| AG35NEEFCMQVR|  307|
|A1M8PP7MLHNBQB|  278|
|A1D2C0WDCSHUWZ|  271|
| AHD101501WCN1|  242|
|A2VKWLCNZF4ZVB|  205|
|A1NATT3PN24QWY|  200|
|A1K1JW1C5CUSUZ|  179|
|A1X8VZWTOG8IS6|  174|
|A1S3C5OFU508P3|  158|
|A3M174IC0VXOS2|  152|
|A2EDZH51XHFA9B|  147|
|A2VE83MZF98ITY|  143|
|A21NVBFIEQWDSG|  142|
|A2NJO6YE954DBH|  141|
|A2OJW07GQRNJUT|  129|
|A2F6N60Z96CAJI|  118|
|A1OX82JPAQLL60|  113|
|A281NPSIMI1C2R|  112|
+--------------+-----+
only showing top 20 rows

[Stage 147:>                                                        (0 + 1) / 1]
+-------+--------------------+
|summary|             User_id|
+-------+--------------------+
|  count|              305186|
|   mean|                NULL|
| stddev|                NULL|
|    min|A00117421L76WVWG4...|
|    max|       AZZZZW74AAX75|
+-------+--------------------+

                                                                                
In [15]:
# Join to filter the ratings
filtered_ratings = books_rating.join(filtered_user_ids, "User_id", "inner")
In [16]:
# Merging both datasets
books_merged = filtered_ratings.join(books_data, "Title", "inner")
In [17]:
# Ensure the rating column is numeric
books_final = books_merged.withColumn("rating", col("rating").cast("double"))

books_final.show()
                                                                                
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|            Title|       User_id|        Id|rating|Price|             authors|           publisher|categories|publishedYear|
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|          Herland|A101DG7P9E26PW|1419123548|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|          Herland|A101DG7P9E26PW|1421810182|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|       Spellbound|A10VOEBL5S337W|014250193X|   4.0| 5.99|  ['James Essinger']|               Delta|   History|         2007|
|        Plainsong|A111DVWFAZPOO1|0375705856|   1.0|10.20|      ['Kent Haruf']|             Vintage|   Fiction|         2001|
|         Hannibal|A11ZDEVYIMC6AI|0440224675|   2.0| 7.67|   ['Thomas Harris']|                Dell|   Fiction|         2009|
|         Hannibal|A11ZDEVYIMC6AI|0440224675|   2.0| 7.67|   ['Thomas Harris']|                Dell|   Fiction|         2009|
|    Riddle-master|A11ZRTRRYD9P2W|0441005969|   5.0|12.92|['Patricia A. McK...|             Penguin|   Fiction|         1999|
|              Vox|A1241U6QCSX5YJ|0679742115|   3.0|10.95|['Christina Dalch...|            Boekerij|   Fiction|         2019|
|     Insurrection|A124MIARA7T5J9|0671720244|   3.0| 7.56|   ['Peter Rollins']|  Simon and Schuster|  Religion|         2011|
|           Hamlet|A125T7DGLW3GPP|0764120840|   5.0| 7.06|['William Shakesp...|     Bantam Classics|     Drama|         1988|
|Lovin' Mrs. Jones|A129029D0E7ADN|0972458603|   3.0|11.66|['Edward Dean Arn...|Pearlstone Pub In...|   Fiction|         2003|
|    Riddle-master|A12HYMIF3856GN|0441005969|   5.0|12.92|['Patricia A. McK...|             Penguin|   Fiction|         1999|
|  Superpatriotism|A12J7R7GAJ5FBZ|0872864332|   5.0| 9.95| ['Michael Parenti']|   City Lights Books|   History|         2004|
|       Providence|A12J7R7GAJ5FBZ|0595197809|   5.0|15.95| ['Caroline Kepnes']|               Lenny|   Fiction|         2018|
|          Potluck|A12UQKP0MU09LN|1892590379|   5.0|14.95|['Kristin Donnelly']|     Clarkson Potter|   Cooking|         2016|
|           Drakon|A12X1VN7QQNPPC|0671877119|   3.0| 5.99|    ['N.J. Walters']|    Entangled: Amara|   Fiction|         2018|
|       Bellwether|A131JVFO4PJKPE|0553562967|   5.0| 7.56|   ['Connie Willis']|             Spectra|   Fiction|         2010|
|           Hamlet|A13A206PIDTBQK|0764120840|   5.0| 7.06|['William Shakesp...|     Bantam Classics|     Drama|         1988|
|        Plainsong|A13F2H4DIX231F|0375705856|   5.0|10.20|      ['Kent Haruf']|             Vintage|   Fiction|         2001|
|     Insurrection|A13WA3SCM778LO|0671720244|   3.0| 7.56|   ['Peter Rollins']|  Simon and Schuster|  Religion|         2011|
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
only showing top 20 rows

In [18]:
books_final.describe().show()
[Stage 180:=====================>                                   (3 + 5) / 8]
+-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+
|summary|Title|       User_id|                  Id|            rating|             Price|             authors|       publisher|categories|    publishedYear|
+-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+
|  count| 6600|          6600|                6600|              6571|              6600|                6600|            6600|      6600|             6600|
|   mean| NULL|          NULL| 6.845429041481897E8| 3.721503576320195|18.180798484848523|                NULL|            NULL|      NULL|2007.634696969697|
| stddev| NULL|          NULL|4.6595010557118016E8|1.4230303776831768|55.546915116959966|                NULL|            NULL|      NULL|6.783271168969406|
|    min|  51a|A100I0T791DIKS|          0007104022|               1.0|             10.00|"[""John O'Loughl...|  010 Publishers|   Animals|             1943|
|    max|Zoom!| AZYIWJ4P9VZVE|          B000MTEKTQ|               5.0|             96.80|   ['Yasushi Inoue']|powerHouse Books|    Travel|             2022|
+-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+

                                                                                
In [19]:
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd
temp_df = books_final.groupBy("rating").count().toPandas()

# Create trace1 for bar chart
trace1 = go.Bar(
    x=temp_df['rating'],
    y=temp_df['count'],
    text = temp_df['count'],  # Labels for each bar
    marker=dict(color='rgb(255,165,0)', line=dict(color='rgb(0,0,0)', width=1.5))
)
layout_bar = go.Layout(
    template="plotly_dark",
    title='RATINGS COUNT',
    xaxis=dict(title='Rating'),
    yaxis=dict(title='Count')
)
fig_bar = go.Figure(data=[trace1], layout=layout_bar)
fig_bar.show()
                                                                                
In [20]:
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd

# Filter out null values
temp_df_filtered = temp_df.dropna(subset=['rating'])

def pie_plot(cnt_srs, title):
    labels = cnt_srs['rating']
    values = cnt_srs['count']
    trace = go.Pie(
        labels=labels,
        values=values,
        hoverinfo='percent+value',
        textinfo='percent',
        textposition='inside',
        hole=0.7,
        showlegend=True,
        marker=dict(
            colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
            line=dict(color='#000000', width=2)
        )
    )
    layout = go.Layout(
        template="plotly_dark",
        title=title
    )
    fig = go.Figure(data=[trace], layout=layout)
    return fig

fig_pie = pie_plot(temp_df_filtered, 'Rating Distribution')
fig_pie.show()
In [21]:
from pyspark.sql.functions import col, when
# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Fiction", when(col("categories") == "Fiction", 1).otherwise(0)) \
    .withColumn("Non-Fiction", when(col("categories") != "Fiction", 1).otherwise(0)) \
    .groupBy().sum("Fiction", "Non-Fiction") \
    .withColumnRenamed("sum(Fiction)", "Fiction") \
    .withColumnRenamed("sum(Non-Fiction)", "Non-Fiction")
genre_counts_pd = genre_counts.toPandas()
fiction_label = genre_counts_pd.iloc[0]["Fiction"]
non_fiction_label = genre_counts_pd.iloc[0]["Non-Fiction"]

trace1 = go.Bar(
    x=["Fiction", "Non-Fiction"],
    y=[fiction_label, non_fiction_label],
    text=[fiction_label, non_fiction_label],  # Labels for each bar
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(template="plotly_dark", title = 'Count of ratings among Fiction and Non-Fiction Books', xaxis=dict(title='Categories'), yaxis=dict(title='Count'))
fig = go.Figure(data=[trace1], layout=layout)
fig.show()
                                                                                
In [22]:
import plotly.graph_objs as go
from pyspark.sql.functions import col, when

# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Category", when(col("categories") == "Fiction", "Fiction").otherwise("Non-Fiction"))
genre_counts_pie = genre_counts.groupBy('Category').count()
genre_counts_pd = genre_counts_pie.toPandas()
def pie_plot(cnt_srs, title):
    labels = cnt_srs['Category']
    values = cnt_srs['count']
    trace = go.Pie(
        labels=labels,
        values=values,
        hoverinfo='percent+value',
        textinfo='percent',
        textposition='inside',
        hole=0.7,
        showlegend=True,
        marker=dict(
            colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
            line=dict(color='#000000', width=2)
        )
    )
    layout = go.Layout(
        template="plotly_dark",
        title=title
    )
    fig = go.Figure(data=[trace], layout=layout)
    return fig
fig_pie = pie_plot(genre_counts_pd, 'GENRE Distribution')
fig_pie.show()
                                                                                
In [23]:
!pip install bubbly
DEPRECATION: Loading egg at /Users/soumitra7/anaconda3/lib/python3.11/site-packages/pyBWMD-0.0.1-py3.11.egg is deprecated. pip 23.3 will enforce this behaviour change. A possible replacement is to use pip for package installation..
Requirement already satisfied: bubbly in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (1.0.2)
Requirement already satisfied: plotly in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from bubbly) (5.9.0)
Requirement already satisfied: pandas in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from bubbly) (2.0.3)
Requirement already satisfied: python-dateutil>=2.8.2 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (2023.3.post1)
Requirement already satisfied: tzdata>=2022.1 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (2023.3)
Requirement already satisfied: numpy>=1.21.0 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (1.24.3)
Requirement already satisfied: tenacity>=6.2.0 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from plotly->bubbly) (8.2.2)
Requirement already satisfied: six>=1.5 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from python-dateutil>=2.8.2->pandas->bubbly) (1.16.0)
In [24]:
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd
df1 = books_final.groupBy(['categories', 'publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()
fig = make_subplots(rows=1, cols=1)
for genre in df1['categories'].unique():
    df_genre = df1[df1['categories'] == genre]
    fig.add_trace(go.Scatter(
        x=df_genre['User Rating'],
        y=df_genre['Price'],
        mode='markers',
        marker=dict(size=10),
        name=genre
    ))
fig.update_layout(
    template="plotly_dark",
    title='Avg price and rating of various categories',
    xaxis_title='User Rating',
    yaxis_title='Avg Price',
    xaxis=dict(type='log'),  # Log scale for x-axis
    showlegend=True
)
fig.show()
                                                                                
In [25]:
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd
df1 = books_final.groupBy(['publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()
fig = make_subplots(rows=1, cols=1)
for year in df1['publishedYear'].unique():
    df_genre = df1[df1['publishedYear'] == year]
    fig.add_trace(go.Scatter(
        x=df_genre['User Rating'],
        y=df_genre['Price'],
        mode='markers',
        marker=dict(size=10),
        name=str(year)
    ))
fig.update_layout(
    template="plotly_dark",
    title='Avg price and rating over the years',
    xaxis_title='User Rating',
    yaxis_title='Avg Price',
    xaxis=dict(type='log'),  # Log scale for x-axis
    showlegend=True
)
fig.show()
                                                                                
In [26]:
# Group by Author and count occurrences
top_authors = books_final.groupBy('authors').agg(F.avg('rating').alias("author_rating")) \
    .orderBy('author_rating', ascending=False).limit(10)
top_authors_pd = top_authors.toPandas()
trace1 = go.Bar(
    x=top_authors_pd['authors'],
    y=top_authors_pd['author_rating'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(
    template="plotly_dark",
    title='TOP 10 AUTHORS WITH HIGH AVERAGE RATING',
    xaxis=dict(title='Author', tickangle=45),
    yaxis=dict(title='Avg rating')
)
fig = go.Figure(data=[trace1], layout=layout)
fig.show()
                                                                                
In [27]:
# Group by Author and count occurrences
top_publishers = books_final.groupBy('publisher').agg(F.avg('rating').alias("publisher_rating")) \
    .orderBy('publisher_rating', ascending=False).limit(10)
top_publishers_pd = top_publishers.toPandas()
trace1 = go.Bar(
    x=top_publishers_pd['publisher'],
    y=top_publishers_pd['publisher_rating'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(
    template="plotly_dark",
    title='TOP 10 PUBLISHERS WITH HIGH AVERAGE RATING',
    xaxis=dict(title='publisher', tickangle=45),
    yaxis=dict(title='Avg rating')
)
fig = go.Figure(data=[trace1], layout=layout)
fig.show()
                                                                                
In [28]:
temp_df1_year = books_final.groupBy('publishedYear').mean()
temp_df1_year_pd = temp_df1_year.toPandas()
trace1_year = go.Bar(
    x=temp_df1_year_pd['publishedYear'],
    y=temp_df1_year_pd['avg(rating)'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)',width=1.5))
)
layout_year = go.Layout(
    template="plotly_dark",
    title='AVERAGE REVIEWS OVER THE YEARS',
    xaxis=dict(title='Year'),
    yaxis=dict(title='count')
)
fig_year = go.Figure(data=[trace1_year], layout=layout_year)
fig_year.show()
                                                                                
In [29]:
from pyspark.sql import functions as F
import plotly.graph_objs as go
import pandas as pd

# Group by publishedYear and calculate the mean of price
temp_df_year = books_final.groupBy('publishedYear').agg(F.mean('Price').alias('avg_price'))
temp_df_year_pd = temp_df_year.toPandas()
trace1_year = go.Bar(
    x=temp_df_year_pd['publishedYear'],
    y=temp_df_year_pd['avg_price'],
    marker=dict(color='rgb(148, 103, 189)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout_year = go.Layout(
    template="plotly_dark",
    title='AVERAGE PRICE OVER THE YEARS',
    xaxis=dict(title='Year'),
    yaxis=dict(title='Average Price')
)
fig_year = go.Figure(data=[trace1_year], layout=layout_year)
fig_year.show()
                                                                                
In [30]:
books_final.show(20)
24/05/09 23:00:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|            Title|       User_id|        Id|rating|Price|             authors|           publisher|categories|publishedYear|
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|          Herland|A101DG7P9E26PW|1419123548|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|          Herland|A101DG7P9E26PW|1421810182|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|       Spellbound|A10VOEBL5S337W|014250193X|   4.0| 5.99|  ['James Essinger']|               Delta|   History|         2007|
|        Plainsong|A111DVWFAZPOO1|0375705856|   1.0|10.20|      ['Kent Haruf']|             Vintage|   Fiction|         2001|
|         Hannibal|A11ZDEVYIMC6AI|0440224675|   2.0| 7.67|   ['Thomas Harris']|                Dell|   Fiction|         2009|
|         Hannibal|A11ZDEVYIMC6AI|0440224675|   2.0| 7.67|   ['Thomas Harris']|                Dell|   Fiction|         2009|
|    Riddle-master|A11ZRTRRYD9P2W|0441005969|   5.0|12.92|['Patricia A. McK...|             Penguin|   Fiction|         1999|
|              Vox|A1241U6QCSX5YJ|0679742115|   3.0|10.95|['Christina Dalch...|            Boekerij|   Fiction|         2019|
|     Insurrection|A124MIARA7T5J9|0671720244|   3.0| 7.56|   ['Peter Rollins']|  Simon and Schuster|  Religion|         2011|
|           Hamlet|A125T7DGLW3GPP|0764120840|   5.0| 7.06|['William Shakesp...|     Bantam Classics|     Drama|         1988|
|Lovin' Mrs. Jones|A129029D0E7ADN|0972458603|   3.0|11.66|['Edward Dean Arn...|Pearlstone Pub In...|   Fiction|         2003|
|    Riddle-master|A12HYMIF3856GN|0441005969|   5.0|12.92|['Patricia A. McK...|             Penguin|   Fiction|         1999|
|  Superpatriotism|A12J7R7GAJ5FBZ|0872864332|   5.0| 9.95| ['Michael Parenti']|   City Lights Books|   History|         2004|
|       Providence|A12J7R7GAJ5FBZ|0595197809|   5.0|15.95| ['Caroline Kepnes']|               Lenny|   Fiction|         2018|
|          Potluck|A12UQKP0MU09LN|1892590379|   5.0|14.95|['Kristin Donnelly']|     Clarkson Potter|   Cooking|         2016|
|           Drakon|A12X1VN7QQNPPC|0671877119|   3.0| 5.99|    ['N.J. Walters']|    Entangled: Amara|   Fiction|         2018|
|       Bellwether|A131JVFO4PJKPE|0553562967|   5.0| 7.56|   ['Connie Willis']|             Spectra|   Fiction|         2010|
|           Hamlet|A13A206PIDTBQK|0764120840|   5.0| 7.06|['William Shakesp...|     Bantam Classics|     Drama|         1988|
|        Plainsong|A13F2H4DIX231F|0375705856|   5.0|10.20|      ['Kent Haruf']|             Vintage|   Fiction|         2001|
|     Insurrection|A13WA3SCM778LO|0671720244|   3.0| 7.56|   ['Peter Rollins']|  Simon and Schuster|  Religion|         2011|
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
only showing top 20 rows

In [31]:
# Pivoting the DataFrame
book_pivot = books_final.groupBy("Title").pivot("User_id").avg("rating")

# Fill Null values with 0
book_pivot = book_pivot.na.fill(0)

# Convert pivot table to RDD of (Title, features) tuples
sparse_rdd = book_pivot.rdd.map(lambda row: (row[0], Vectors.dense(row[1:])))

# Define a schema for the RDD
schema = ["Title", "features"]

# Create a DataFrame from the RDD
sparse_matrix = spark.createDataFrame(sparse_rdd, schema)
sparse_matrix.show()

# Normalizing the features
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
norm_features = normalizer.transform(sparse_matrix)

# Creating a Bucketed Random Projection LSH model
brp = BucketedRandomProjectionLSH(inputCol="norm_features", outputCol="hashes", bucketLength=1.0, numHashTables=10)
model = brp.fit(norm_features)
24/05/09 23:00:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/09 23:00:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/09 23:00:45 WARN DAGScheduler: Broadcasting large task binary with size 1435.8 KiB
24/05/09 23:00:51 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/05/09 23:00:55 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
                                                                                
+-----------------+--------------------+
|            Title|            features|
+-----------------+--------------------+
|          Nemesis|[0.0,0.0,0.0,0.0,...|
|             Bite|[0.0,0.0,1.0,0.0,...|
|            Mercy|[0.0,0.0,0.0,0.0,...|
|          Romance|[0.0,0.0,0.0,0.0,...|
|           Guyana|[0.0,0.0,0.0,0.0,...|
|           Hostas|[5.0,0.0,0.0,0.0,...|
|          Believe|[0.0,0.0,0.0,0.0,...|
|        Rockbound|[0.0,0.0,0.0,0.0,...|
|         Restoree|[0.0,0.0,0.0,0.0,...|
|   Approaching...|[0.0,0.0,0.0,0.0,...|
|    Riddle-master|[0.0,0.0,0.0,0.0,...|
|      Borderlands|[0.0,0.0,0.0,0.0,...|
|           Nettie|[0.0,0.0,0.0,0.0,...|
|              Art|[0.0,0.0,0.0,0.0,...|
|Lighthousekeeping|[0.0,0.0,0.0,0.0,...|
|           Saucer|[0.0,0.0,0.0,0.0,...|
|          Herland|[0.0,4.0,0.0,0.0,...|
|       Grievances|[0.0,0.0,0.0,0.0,...|
|           Jurgen|[0.0,0.0,0.0,0.0,...|
|         Rawsome!|[0.0,0.0,0.0,0.0,...|
+-----------------+--------------------+
only showing top 20 rows

24/05/09 23:00:58 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/05/09 23:01:00 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
In [32]:
#Provide input book title from the user
user_input_title = input("Enter the title of the book: ")
user_input_title = user_input_title.title()
Enter the title of the book: bite
In [33]:
# Check if the input book title is in the DataFrame
if book_pivot.filter(col("Title") == user_input_title).count() == 0:
    print(f"Book '{user_input_title}' not found.")
else:
    # Extracting the features of the input book
    input_book_features = norm_features.filter(col("Title") == user_input_title).select("norm_features").collect()[0][0]

# Approximate k nearest neighbors of the input book
knn = model.approxNearestNeighbors(norm_features, input_book_features, 6)
# Display the recommended books with their IDs
recommended_books = knn.filter(col("Title") != user_input_title).limit(5).collect()

for book in recommended_books:
    print(book[0])
24/05/09 23:01:45 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/05/09 23:01:48 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
[Stage 465:>                                                        (0 + 1) / 1]
Jumpmetrics
Dreamspy
Scarab-4
Kunma
Sensation
                                                                                
In [ ]: